package cn.spream.jstudy.kafka.consumer.service;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: sjx
 * Date: 15-9-8
 * Time: 下午2:22
 * To change this template use File | Settings | File Templates.
 */
public class KafkaMessageConsumer {

    private String topic;
    private int topicCount;
    private ConsumerConfig consumerConfig;

    public void receive(Callback callback) {
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, topicCount);
        Decoder keyDecoder = new StringDecoder(new VerifiableProperties());
        Decoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap = consumerConnector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        List<KafkaStream<String, String>> kafkaStreams = kafkaStreamsMap.get(topic);
        for (KafkaStream<String, String> kafkaStream : kafkaStreams) {
            ConsumerIterator<String, String> consumerIterator = kafkaStream.iterator();
            while (consumerIterator.hasNext()) {
                MessageAndMetadata<String, String> messageAndMetadata = consumerIterator.next();
                callback.onMessage(messageAndMetadata.message());
            }
        }
    }

    public interface Callback {

        public void onMessage(String message);

    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setTopicCount(int topicCount) {
        this.topicCount = topicCount;
    }

    public void setConsumerConfig(ConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }
}
